﻿#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "RtmpConnection.h"
#include "Logger.h"
#include "Util/String.h"
#include "Common/Define.h"
#include "Rtmp.h"
#include "Common/Config.h"
#include "Hook/MediaHook.h"

#include <sstream>

using namespace std;

RtmpConnection::RtmpConnection(const EventLoop::Ptr& loop, const Socket::Ptr& socket)
    :TcpConnection(loop, socket)
    ,_loop(loop)
    ,_socket(socket)
{
    logInfo << "RtmpConnection";
}

RtmpConnection::~RtmpConnection()
{
    logInfo << "~RtmpConnection";
    auto rtmpSrc = _source.lock();
    if (_isPublish && rtmpSrc) {
        rtmpSrc->delConnection(this);
        rtmpSrc->release();
        // rtmpSrc->delOnDetach(this);
    } else if (rtmpSrc) {
        rtmpSrc->delConnection(this);
        // rtmpSrc->delOnDetach(this);
    }
}

void RtmpConnection::init()
{
    weak_ptr<RtmpConnection> wSelf = static_pointer_cast<RtmpConnection>(shared_from_this());
    _chunk.setOnRtmpChunk([wSelf](RtmpMessage msg){
        auto self = wSelf.lock();
        if (self) {
            self->onRtmpChunk(msg);
        }
    });
    _handshake = make_shared<RtmpHandshake>(RtmpHandshake::HANDSHAKE_C0C1);
    _handshake->setOnHandshake([wSelf](const StreamBuffer::Ptr &buffer){
        auto self = wSelf.lock();
        if (self) {
            self->send(buffer);
        }
    });
    
    _handshake->setOnRtmpChunk([wSelf](const StreamBuffer::Ptr &buffer){
        auto self = wSelf.lock();
        if (self) {
            self->_chunk.parse(buffer);
        }
    });
}

void RtmpConnection::close()
{
    TcpConnection::close();
}

void RtmpConnection::onManager()
{
    logInfo << "manager";
}

void RtmpConnection::onRead(const StreamBuffer::Ptr& buffer, struct sockaddr* addr, int len)
{
    // logInfo << "get a buf: " << buffer->size();
    // _parser.parse(buffer->data(), buffer->size());
    if (_handshake->isCompleted()) {
        // logInfo << "parser chunk";
        _chunk.parse(buffer);
        // logInfo << "parser chunk end";
    } else {
        logInfo << "parser handshake";
        _handshake->parse(buffer);
    }
}

void RtmpConnection::onError()
{
    close();
    logWarn << "get a error: ";
}

ssize_t RtmpConnection::send(Buffer::Ptr pkt)
{
    // logInfo << "pkt size: " << pkt->size();
    return TcpConnection::send(pkt);
}

void RtmpConnection::onRtmpChunk(RtmpMessage msg)
{
    if (!msg.isCompleted()) {
        return ;
    }

    bool ret = true;  
    // logInfo << "get rtmp msg: " << (int)msg.type_id;
    switch(msg.type_id)
    {        
        case RTMP_VIDEO:
            ret = handleVideo(msg);
            break;
        case RTMP_AUDIO:
            ret = handleAudio(msg);
            break;
        case RTMP_INVOKE:
            logInfo << "handleInvoke";
            ret = handleInvoke(msg);
            break;
        case RTMP_NOTIFY:
            logInfo << "handleNotify";
            ret = handleNotify(msg);
            break;
        case RTMP_FLEX_MESSAGE:
            logInfo << "unsupported rtmp flex message";
			ret = false;
            break;            
        case RTMP_SET_CHUNK_SIZE:           
			_chunk.setInChunkSize(readUint32BE(msg.payload.get()));
            break;
		case RTMP_BANDWIDTH_SIZE:
			break;
        case RTMP_FLASH_VIDEO:
            logInfo << "unsupported rtmp flash video";
			ret = false;
            break;    
        case RTMP_ACK:
            break;            
        case RTMP_ACK_SIZE:
            break;
        case RTMP_USER_EVENT:
            handleUserEvent(msg);
            break;
        default:
            logInfo << "unkonw message type: " << (int)msg.type_id;
            break;
    }

	if (!ret) {
		logInfo << "msg.type_id: " << (int)msg.type_id;
	}
		
    return ;
}

bool RtmpConnection::handleInvoke(RtmpMessage& rtmp_msg)
{   
    bool ret  = true;
    _amfDecoder.reset();
  
	int bytes_used = _amfDecoder.decode((const char *)rtmp_msg.payload.get(), rtmp_msg.length, 1);
	if (bytes_used < 0) {
		return false;
	}

    std::string method = _amfDecoder.getString();
	//LOG_INFO("[Method] %s\n", method.c_str());
    logInfo << "method: " << method;
    logInfo << "rtmp_msg.stream_id: " << rtmp_msg.stream_id;
    logInfo << "_streamId: " << _streamId;
    if(rtmp_msg.stream_id == 0) {
        bytes_used += _amfDecoder.decode(rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
        if(method == "connect") {            
            ret = handleConnect();
        }
        else if(method == "createStream") {      
            ret = handleCreateStream();
        }
    }
    else if(rtmp_msg.stream_id == _streamId) {
        bytes_used += _amfDecoder.decode((const char *)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 3);
        _streamName = _amfDecoder.getString();
        // _path = "/" + _app + "/" + _streamName;
        _tcUrl += "/" + _streamName;
        // logInfo << "get a path: " << _path;
        logInfo << "get a _tcUrl: " << _tcUrl;

        _urlParser.parse(_tcUrl);
        // logInfo << "_urlParser.path_: " << _urlParser.path_;
        // logInfo << "_urlParser.type: " << _urlParser.type_;
        _path = _urlParser.path_;
    
        if((int)rtmp_msg.length > bytes_used) {
            bytes_used += _amfDecoder.decode((const char *)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);                      
        }
            
        if(method == "publish" || method == "FCPublish") {
            PublishInfo info;
            info.protocol = _urlParser.protocol_;
            info.type = _urlParser.type_;
            info.uri = _urlParser.path_;
            info.vhost = _urlParser.vhost_;

            weak_ptr<RtmpConnection> wSelf = dynamic_pointer_cast<RtmpConnection>(shared_from_this());
            MediaHook::instance()->onPublish(info, [wSelf](const PublishResponse &rsp){
                auto self = wSelf.lock();
                if (!self) {
                    return ;
                }

                if (!rsp.authResult) {
                    self->onError();
                    return ;
                }

                self->handlePublish();
            });
            ret = true;
        }
        else if(method == "play") {   
            PlayInfo info;
            info.protocol = _urlParser.protocol_;
            info.type = _urlParser.type_;
            info.uri = _urlParser.path_;
            info.vhost = _urlParser.vhost_;

            weak_ptr<RtmpConnection> wSelf = dynamic_pointer_cast<RtmpConnection>(shared_from_this());
            MediaHook::instance()->onPlay(info, [wSelf](const PlayResponse &rsp){
                auto self = wSelf.lock();
                if (!self) {
                    return ;
                }

                if (!rsp.authResult) {
                    self->onError();
                    return ;
                }

                self->handlePlay();
            });       
            ret = true;
        }
        else if(method == "play2") {         
            ret = handlePlay2();
        }
        else if(method == "DeleteStream") {
            ret = handleDeleteStream();
        } else if (method == "releaseStream") {

        } else if (method == "seek") {

        } else if (method == "pause") {
            
        }
    }

    return ret;
}

bool RtmpConnection::handleNotify(RtmpMessage& rtmp_msg)
{   
    _amfDecoder.reset();
    int bytes_used = _amfDecoder.decode((const char *)rtmp_msg.payload.get(), rtmp_msg.length, 1);
    if(bytes_used < 0) {
        return false;
    }

    logInfo << "_amfDecoder.getString(): " << _amfDecoder.getString();
    if(_amfDecoder.getString() == "@setDataFrame")
    {
        _amfDecoder.reset();
        bytes_used = _amfDecoder.decode((const char *)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used, 1);
        if(bytes_used < 0) {           
            return false;
        }

        logInfo << "_amfDecoder.getString(): " << _amfDecoder.getString();
        if(_amfDecoder.getString() == "onMetaData") {
            _amfDecoder.decode((const char *)rtmp_msg.payload.get()+bytes_used, rtmp_msg.length-bytes_used);
            _metaData = _amfDecoder.getObjects();
            
			// auto server = rtmp_server_.lock();
			// if (!server) {
			// 	return false;
			// }

            // auto session = rtmp_session_.lock();
            // if(session) {
			// 	session->SetMetaData(meta_data_);
			// 	session->SendMetaData(meta_data_);
            // }
        }
    }

    return true;
}

bool RtmpConnection::handleUserEvent(RtmpMessage& rtmp_msg)
{
    char *payload = rtmp_msg.payload.get();
    uint32_t length = rtmp_msg.length;

    uint16_t event_type = readUint16BE(payload);
    payload += 2;
    length -= 2;

    switch (event_type) {
        case 6: //CONTROL_PING_REQUEST
        {
            if (length < 4) {
                return false;
            }
            uint32_t timeStamp = readUint32BE(payload);
            break;
        }
        case 7: //CONTROL_PING_RESPONSE
        {
            if (length < 4) {
                return false;
            }
            uint32_t timeStamp = readUint32BE(payload);
            break;
        }
        case 0: //play
        {
            if (length < 4) {
                return false;
            }
            uint32_t index = readUint32BE(payload);
            break;
        }
        case 1: //pause
        {
            if (length < 4) {
                return false;
            }
            uint32_t index = readUint32BE(payload);
            break;
        }
        case 2: //stop
        {
            if (length < 4) {
                return false;
            }
            uint32_t index = readUint32BE(payload);
            break;
        }
        default:
            break;
    }

    return true;
}

bool RtmpConnection::handleVideo(RtmpMessage& rtmp_msg)
{
    if (!_validVideoTrack) {
        return false;
    }
    auto rtmpSrc = _source.lock();
    if (!rtmpSrc) {
        close();
        return false;
    }
	uint8_t type = RTMP_VIDEO;
	uint8_t *payload = (uint8_t *)rtmp_msg.payload.get();
	uint32_t length = rtmp_msg.length;
	uint8_t frame_type = (payload[0] >> 4) & 0x0f;
	uint8_t codec_id = payload[0] & 0x0f;

    // shared_ptr<char> tmpPayload(new char[rtmp_msg.length], std::default_delete<char[]>());
    // memcpy(tmpPayload.get(), payload, rtmp_msg.length);
    // rtmp_msg.payload = tmpPayload;
    // payload = (uint8_t *)rtmp_msg.payload.get();

    // logInfo << "frame_type : " << (int)frame_type << ", codec_id: " << (int)codec_id
    //         << ", timestamp: " << rtmp_msg.abs_timestamp;

    // auto server = rtmp_server_.lock();
    // if (!server) {
    //     return false;
    // }

    // auto session = rtmp_session_.lock();
    // if (session == nullptr) {
    //     return false;
    // }

    if (!_rtmpVideoDecodeTrack) {
        _rtmpVideoDecodeTrack = make_shared<RtmpDecodeTrack>(VideoTrackType);
        if (_rtmpVideoDecodeTrack->createTrackInfo(VideoTrackType, codec_id) != 0) {
            _validVideoTrack = false;
            return false;
        }
        rtmpSrc->addTrack(_rtmpVideoDecodeTrack);
    }

    auto msg = make_shared<RtmpMessage>(std::move(rtmp_msg));
    if (frame_type == 1/* && codec_id == RTMP_CODEC_ID_H264*/) {
            // logInfo << "payload[1] : " << (int)payload[1];
        if (payload[1] == 0) {
            // sps pps??
            _avcHeaderSize = length;
            _avcHeader.reset(new char[length], std::default_delete<char[]>());
            memcpy(_avcHeader.get(), msg->payload.get(), length);
            // session->SetAvcSequenceHeader(avc_sequence_header_, avc_sequence_header_size_);
            rtmpSrc->setAvcHeader(_avcHeader, _avcHeaderSize);
            type = RTMP_AVC_SEQUENCE_HEADER;
            _rtmpVideoDecodeTrack->setConfigFrame(msg);
            rtmpSrc->onReady();
        }
    }
    // } else if (codec_id == RTMP_CODEC_ID_H265) {

    // }

    

    // FILE* fp = fopen("testrtmp.rtmp", "ab+");
    // fwrite(msg->payload.get(), msg->length, 1, fp);
    // fclose(fp);

    msg->trackIndex_ = VideoTrackType;
    _rtmpVideoDecodeTrack->onRtmpPacket(msg);

    // frame_type = (msg->payload.get()[0] >> 4) & 0x0f;
    // codec_id = msg->payload.get()[0] & 0x0f;

    // logInfo << "frame_type : " << (int)frame_type << ", codec_id: " << (int)codec_id
    //         << "msg->payload.get()[0]: " << (int)(msg->payload.get()[0]);

		// session->SendMediaData(type, rtmp_msg._timestamp, rtmp_msg.payload, rtmp_msg.length);

    return true;
}

bool RtmpConnection::handleAudio(RtmpMessage& rtmp_msg)
{
    if (!_validAudioTrack) {
        return false;
    }
    auto rtmpSrc = _source.lock();
    if (!rtmpSrc) {
        close();
        return false;
    }
	uint8_t type = RTMP_AUDIO;
	uint8_t *payload = (uint8_t *)rtmp_msg.payload.get();
	uint32_t length = rtmp_msg.length;
	uint8_t sound_format = (payload[0] >> 4) & 0x0f;
	//uint8_t sound_size = (payload[0] >> 1) & 0x01;
	//uint8_t sound_rate = (payload[0] >> 2) & 0x03;
	uint8_t codec_id = payload[0] & 0x0f;

    // auto server = rtmp_server_.lock();
    // if (!server) {
    //     return false;
    // }

    // auto session = rtmp_session_.lock();
    // if (session == nullptr) {
    //     return false;
    // }

    if (!_rtmpAudioDecodeTrack) {
        _rtmpAudioDecodeTrack = make_shared<RtmpDecodeTrack>(AudioTrackType);
        if (_rtmpAudioDecodeTrack->createTrackInfo(AudioTrackType, sound_format) != 0) {
            _validAudioTrack = false;
            return false;
        }
        rtmpSrc->addTrack(_rtmpAudioDecodeTrack);
    }

    auto msg = make_shared<RtmpMessage>(std::move(rtmp_msg));
    if (sound_format == RTMP_CODEC_ID_AAC && payload[1] == 0) {
        _aacHeaderSize = msg->length;
        _aacHeader.reset(new char[msg->length], std::default_delete<char[]>());
        memcpy(_aacHeader.get(), msg->payload.get(), msg->length);
        // session->SetAacSequenceHeader(aac_sequence_header_, aac_sequence_header_size_);
        type = RTMP_AAC_SEQUENCE_HEADER;
        rtmpSrc->setAacHeader(_aacHeader, _aacHeaderSize);
        _rtmpAudioDecodeTrack->setConfigFrame(msg);
    }

    msg->trackIndex_ = AudioTrackType;
    _rtmpAudioDecodeTrack->onRtmpPacket(msg);

    // session->SendMediaData(type, rtmp_msg._timestamp, rtmp_msg.payload, rtmp_msg.length);

    return true;
}

bool RtmpConnection::handleConnect()
{
    if(!_amfDecoder.hasObject("app")) {
        return false;
    }

    AmfObject amfObj = _amfDecoder.getObject("app");
    _app = amfObj.amfString;
    if(_app == "") {
        return false;
    }

    if(!_amfDecoder.hasObject("tcUrl")) {
        return false;
    }

    amfObj = _amfDecoder.getObject("tcUrl");
    _tcUrl = amfObj.amfString;
    if(_tcUrl == "") {
        _tcUrl = string(PROTOCOL_RTMP) + "://" + DEFAULT_VHOST + "/" + _app;
    }

    sendAcknowledgement();
    setPeerBandwidth();   
    setChunkSize();

    AmfObjects objects;    
    _amfEncoder.reset();
    _amfEncoder.encodeString("_result", 7);
    _amfEncoder.encodeNumber(_amfDecoder.getNumber());

    objects["fmsVer"] = AmfObject(std::string("FMS/4,5,0,297"));
    objects["capabilities"] = AmfObject(255.0);
    objects["mode"] = AmfObject(1.0);
    _amfEncoder.encodeObjects(objects);
    objects.clear();
    objects["level"] = AmfObject(std::string("status"));
    objects["code"] = AmfObject(std::string("NetConnection.Connect.Success"));
    objects["description"] = AmfObject(std::string("Connection succeeded."));
    objects["objectEncoding"] = AmfObject(0.0);
    _amfEncoder.encodeObjects(objects);  

    sendInvokeMessage(RTMP_CHUNK_INVOKE_ID, _amfEncoder.data(), _amfEncoder.size());
    return true;
}

bool RtmpConnection::handleCreateStream()
{ 
	int streamId = _chunk.getStreamId();

	AmfObjects objects;
	_amfEncoder.reset();
	_amfEncoder.encodeString("_result", 7);
	_amfEncoder.encodeNumber(_amfDecoder.getNumber());
	_amfEncoder.encodeObjects(objects);
	_amfEncoder.encodeNumber(streamId);

	sendInvokeMessage(RTMP_CHUNK_INVOKE_ID, _amfEncoder.data(), _amfEncoder.size());
	_streamId = streamId;
	return true;
}

bool RtmpConnection::handlePublish()
{
    logInfo << "publish: " << _path;
    //LOG_INFO("[Publish] app: %s, stream name: %s, stream path: %s\n", app_.c_str(), stream_name_.c_str(), stream_path_.c_str());

	// auto server = rtmp_server_.lock();
	// if (!server) {
	// 	return false;
	// }

    AmfObjects objects; 
    _amfEncoder.reset();
    _amfEncoder.encodeString("onStatus", 8);
    _amfEncoder.encodeNumber(0);
    _amfEncoder.encodeObjects(objects);

    bool is_error = false;

    do {
        if (_source.lock() || _isPublish) {
            is_error = true;
            objects["level"] = AmfObject(std::string("error"));
            objects["code"] = AmfObject(std::string("NetStream.Publish.BadName"));
            objects["description"] = AmfObject(std::string("Stream already publishing."));
            break;
        }

        // _urlParser.path_ = _path;
        // _urlParser.type_ = DEFAULT_TYPE;
        // _urlParser.protocol_ = "rtmp";
        // _urlParser.vhost_ = DEFAULT_VHOST;
        auto source = MediaSource::getOrCreate(_urlParser.path_, _urlParser.vhost_
                            , _urlParser.protocol_, _urlParser.type_
                            , [this](){
                                return make_shared<RtmpMediaSource>(_urlParser, _loop);
                            });
        if (!source) {
            is_error = true;
            objects["level"] = AmfObject(std::string("error"));
            objects["code"] = AmfObject(std::string("NetStream.Publish.BadName"));
            objects["description"] = AmfObject(std::string("Stream already publishing."));
            break;
        }

        auto rtmpSrc = dynamic_pointer_cast<RtmpMediaSource>(source);
        // rtmpSrc->setSdp(_parser._content);
        rtmpSrc->setOrigin();
        rtmpSrc->setOriginSocket(_socket);
        weak_ptr<RtmpConnection> wSelf = dynamic_pointer_cast<RtmpConnection>(shared_from_this());
        rtmpSrc->addOnDetach(this, [wSelf](){
            auto self = wSelf.lock();
            if (!self) {
                return ;
            }
            self->close();
        });

        _source = rtmpSrc;
        // _rtmpDecodeTrack = make_shared<RtmpDecodeTrack>(0);
        // rtmpSrc->addTrack(_rtmpDecodeTrack);
        // rtmpSrc->setStatus(AVAILABLE);

        /*if(server->HasPublisher(stream_path_)) {
            is_error = true;
            objects["level"] = AmfObject(std::string("error"));
            objects["code"] = AmfObject(std::string("NetStream.Publish.BadName"));
            objects["description"] = AmfObject(std::string("Stream already publishing."));
        }
        // else */
        // if(_isPublish) {
        //     is_error = true;
        //     objects["level"] = AmfObject(std::string("error"));
        //     objects["code"] = AmfObject(std::string("NetStream.Publish.BadConnection"));
        //     objects["description"] = AmfObject(std::string("Connection already publishing."));
        // }
        // /* else if(0)  {
        //     // 认证处理 
        // } */
        // else {
            objects["level"] = AmfObject(std::string("status"));
            objects["code"] = AmfObject(std::string("NetStream.Publish.Start"));
            objects["description"] = AmfObject(std::string("Start publising."));
            // server->AddSession(stream_path_);
            // rtmp_session_ = server->GetSession(stream_path_);

            // if (server) {
            // 	server->NotifyEvent("publish.start", stream_path_);
            // }
        // }
    } while (0);

    _amfEncoder.encodeObjects(objects);     
    sendInvokeMessage(RTMP_CHUNK_INVOKE_ID, _amfEncoder.data(), _amfEncoder.size());

    if(is_error) {
        close();
    }
    else {
        // connection_state_ = START_PUBLISH;
		_isPublish = true;
    }

    // auto session = rtmp_session_.lock();
    // if(session) {
	// 	session->SetGopCache(max_gop_cache_len_);
	// 	session->AddSink(std::dynamic_pointer_cast<RtmpSink>(shared_from_this()));
    // }        

    return true;
}

void RtmpConnection::responsePlay(const MediaSource::Ptr &src)
{
    AmfObjects objects; 
    if (!src || !dynamic_pointer_cast<RtmpMediaSource>(src)) {
        logInfo << "No such stream";
        _amfEncoder.reset(); 
        _amfEncoder.encodeString("onStatus", 8);
        _amfEncoder.encodeNumber(0);
        objects["level"] = AmfObject(std::string("error"));
        objects["code"] = AmfObject(std::string("NetStream.Play.StreamNotFound"));
        objects["description"] = AmfObject(std::string("No such stream."));
        _amfEncoder.encodeObjects(objects);
        sendInvokeMessage(RTMP_CHUNK_INVOKE_ID, _amfEncoder.data(), _amfEncoder.size());
        close();
        return;
    }

    auto rtmpSrc = dynamic_pointer_cast<RtmpMediaSource>(src);

    _source = rtmpSrc;
    logInfo << "Resetting and playing stream";

    _amfEncoder.reset(); 
    _amfEncoder.encodeString("onStatus", 8);
    _amfEncoder.encodeNumber(0);
    _amfEncoder.encodeObjects(objects);
    objects["level"] = AmfObject(std::string("status"));
    objects["code"] = AmfObject(std::string("NetStream.Play.Reset"));
    objects["description"] = AmfObject(std::string("Resetting and playing stream."));
    _amfEncoder.encodeObjects(objects);   
    if(!sendInvokeMessage(RTMP_CHUNK_INVOKE_ID, _amfEncoder.data(), _amfEncoder.size())) {
        return ;
    }

    logInfo << "Started playing";
    objects.clear(); 
    _amfEncoder.reset(); 
    _amfEncoder.encodeString("onStatus", 8);
    _amfEncoder.encodeNumber(0);    
    _amfEncoder.encodeObjects(objects);
    objects["level"] = AmfObject(std::string("status"));
    objects["code"] = AmfObject(std::string("NetStream.Play.Start"));
    objects["description"] = AmfObject(std::string("Started playing."));   
    _amfEncoder.encodeObjects(objects);
    if(!sendInvokeMessage(RTMP_CHUNK_INVOKE_ID, _amfEncoder.data(), _amfEncoder.size())) {
        return ;
    }

    logInfo << "RtmpSampleAccess";
    _amfEncoder.reset(); 
    _amfEncoder.encodeString("|RtmpSampleAccess", 17);
    _amfEncoder.encodeBoolean(true);
    _amfEncoder.encodeBoolean(true);
    if(!sendNotifyMessage(RTMP_CHUNK_DATA_ID, _amfEncoder.data(), _amfEncoder.size())) {
        return ;
    }

    _isPlay = true;
    logInfo << "sendMetaData";
    auto metadata = rtmpSrc->getMetadata();
    if (metadata.size() > 0) {
        sendMetaData(metadata);
    }

    if (rtmpSrc->_avcHeader) {
        logInfo << "send a avc header";
        RtmpMessage rtmp_msg;
        rtmp_msg.abs_timestamp = 0;
        rtmp_msg.stream_id = _streamId;
        rtmp_msg.payload = rtmpSrc->_avcHeader;
        rtmp_msg.length = rtmpSrc->_avcHeaderSize;

        rtmp_msg.type_id = RTMP_VIDEO;
        sendRtmpChunks(RTMP_CHUNK_VIDEO_ID, rtmp_msg);
    }

    if (!_playReader) {
        logInfo << "set _playReader";
        static int interval = Config::instance()->getAndListen([](const json &config){
            interval = Config::instance()->get("Rtmp", "Server", "Server1", "interval");
            if (interval == 0) {
                interval = 5000;
            }
        }, "Rtmp", "Server", "Server1", "interval");

        if (interval == 0) {
            interval = 5000;
        }
        weak_ptr<RtmpConnection> wSelf = static_pointer_cast<RtmpConnection>(shared_from_this());
        _loop->addTimerTask(interval, [wSelf](){
            auto self = wSelf.lock();
            if (!self) {
                return 0;
            }
            self->_lastBitrate = self->_intervalSendBytes / (interval / 1000.0);
            self->_intervalSendBytes = 0;

            return interval;
        }, nullptr);

        _playReader = rtmpSrc->getRing()->attach(_loop, true);
        _playReader->setGetInfoCB([wSelf]() {
            auto self = wSelf.lock();
            ClientInfo ret;
            if (!self) {
                return ret;
            }
            ret.ip_ = self->_socket->getLocalIp();
            ret.port_ = self->_socket->getLocalPort();
            ret.protocol_ = PROTOCOL_RTMP;
            ret.bitrate_ = self->_lastBitrate;
            ret.close_ = [wSelf](){
                auto self = wSelf.lock();
                if (self) {
                    self->onError();
                }
            };
            return ret;
        });
        _playReader->setDetachCB([wSelf]() {
            auto strong_self = wSelf.lock();
            if (!strong_self) {
                return;
            }
            // strong_self->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
            strong_self->close();
        });
        logInfo << "setReadCB =================";
        _playReader->setReadCB([wSelf](const RtmpMediaSource::RingDataType &pack) {
            auto self = wSelf.lock();
            if (!self/* || pack->empty()*/) {
                return;
            }
            logInfo << "send rtmp msg";
            auto pktList = *(pack.get());
            for (auto& pkt : pktList) {
                uint8_t frame_type = (pkt->payload.get()[0] >> 4) & 0x0f;
				uint8_t codec_id = pkt->payload.get()[0] & 0x0f;

                self->_totalSendBytes += pkt->length;
                self->_intervalSendBytes += pkt->length;

                // logInfo << "send rtmp msg,time: " << pkt->abs_timestamp << ", type: " << (int)(pkt->type_id)
                //             << ", length: " << pkt->length;
                self->sendRtmpChunks(pkt->csid, *pkt);
            }
        });
    }
}

bool RtmpConnection::handlePlay()
{
	//LOG_INFO("[Play] app: %s, stream name: %s, stream path: %s\n", app_.c_str(), stream_name_.c_str(), stream_path_.c_str());

	// auto server = rtmp_server_.lock();
	// if (!server) {
	// 	return false;
	// }

    // _urlParser.path_ = _path;
    // _urlParser.type_ = DEFAULT_TYPE;
    // _urlParser.protocol_ = "rtmp";
    // _urlParser.vhost_ = DEFAULT_VHOST;

    weak_ptr<RtmpConnection> wSelf = static_pointer_cast<RtmpConnection>(shared_from_this());
    MediaSource::getOrCreateAsync(_urlParser.path_, _urlParser.vhost_, _urlParser.protocol_, _urlParser.type_, 
    [wSelf](const MediaSource::Ptr &src){
        logInfo << "get a src";
        auto self = wSelf.lock();
        if (!self) {
            return ;
        }

        self->_loop->async([wSelf, src](){
            auto self = wSelf.lock();
            if (!self) {
                return ;
            }

            self->responsePlay(src);
        }, true);
    }, 
    [wSelf]() -> MediaSource::Ptr {
        auto self = wSelf.lock();
        if (!self) {
            return nullptr;
        }
        return make_shared<RtmpMediaSource>(self->_urlParser, nullptr, true);
    }, this);
    
             
    // connection_state_ = START_PLAY; 
    
	// rtmp_session_ = server->GetSession(stream_path_);
	// auto session = rtmp_session_.lock();
    // if(session) {
	// 	session->AddSink(std::dynamic_pointer_cast<RtmpSink>(shared_from_this()));
    // }  
    
	// if (server) {
	// 	server->NotifyEvent("play.start", stream_path_);
	// }

    return true;
}

bool RtmpConnection::handlePlay2()
{
	handlePlay();
    //printf("[Play2] stream path: %s\n", stream_path_.c_str());
    return false;
}

bool RtmpConnection::handleDeleteStream()
{
	// auto server = rtmp_server_.lock();
	// if (!server) {
	// 	return false;
	// }

    if(_path != "") {
        // auto session = rtmp_session_.lock();
        // if(session) {   
		// 	auto conn = std::dynamic_pointer_cast<RtmpSink>(shared_from_this());
		// 	task_scheduler_->AddTimer([session, conn] {
		// 		session->RemoveSink(conn);
		// 		return false;
		// 	}, 1);

		// 	if (is_publishing_) {
		// 		server->NotifyEvent("publish.stop", stream_path_);
		// 	}
		// 	else if (is_playing_) {
		// 		server->NotifyEvent("play.stop", stream_path_);
		// 	}
        // }  

		// is_playing_ = false;
		// is_publishing_ = false;
		// has_key_frame_ = false;
		_chunk.clear();
    }

	return true;
}

bool RtmpConnection::sendMetaData(AmfObjects meta_data)
{
    // if(this->isClosed()) {
    //     return false;
    // }

	if (meta_data.size() == 0) {
		return false;
	}

    _amfEncoder.reset(); 
    _amfEncoder.encodeString("onMetaData", 10);
    _amfEncoder.encodeECMA(meta_data);
    if(!this->sendNotifyMessage(RTMP_CHUNK_DATA_ID, _amfEncoder.data(), _amfEncoder.size())) {
        return false;
    }

    return true;
}

void RtmpConnection::setPeerBandwidth()
{
    std::shared_ptr<char> data(new char[5], std::default_delete<char[]>());
    writeUint32BE(data.get(), 5000000);
    data.get()[4] = 2;
    RtmpMessage rtmp_msg;
    rtmp_msg.type_id = RTMP_BANDWIDTH_SIZE;
    rtmp_msg.payload = data;
    rtmp_msg.length = 5;
    sendRtmpChunks(RTMP_CHUNK_CONTROL_ID, rtmp_msg);
}

void RtmpConnection::sendAcknowledgement()
{
    std::shared_ptr<char> data(new char[4], std::default_delete<char[]>());
    writeUint32BE(data.get(), 5000000);

    RtmpMessage rtmp_msg;
    rtmp_msg.type_id = RTMP_ACK_SIZE;
    rtmp_msg.payload = data;
    rtmp_msg.length = 4;
    sendRtmpChunks(RTMP_CHUNK_CONTROL_ID, rtmp_msg);
}

void RtmpConnection::setChunkSize()
{
	_chunk.setOutChunkSize(5000000);
    std::shared_ptr<char> data(new char[4], std::default_delete<char[]>());
    writeUint32BE((char*)data.get(), 5000000);

    RtmpMessage rtmp_msg;
    rtmp_msg.type_id = RTMP_SET_CHUNK_SIZE;
    rtmp_msg.payload = data;
    rtmp_msg.length = 4;
    sendRtmpChunks(RTMP_CHUNK_CONTROL_ID, rtmp_msg);
}

bool RtmpConnection::sendInvokeMessage(uint32_t csid, std::shared_ptr<char> payload, uint32_t payload_size)
{
    // if(this->IsClosed()) {
    //     return false;
    // }

    RtmpMessage rtmp_msg;
    rtmp_msg.type_id = RTMP_INVOKE;
    rtmp_msg.timestamp = 0;
    rtmp_msg.stream_id = _streamId;
    rtmp_msg.payload = payload;
    rtmp_msg.length = payload_size; 
    sendRtmpChunks(csid, rtmp_msg);  
    return true;
}

bool RtmpConnection::sendNotifyMessage(uint32_t csid, std::shared_ptr<char> payload, uint32_t payload_size)
{
    // if(this->IsClosed()) {
    //     return false;
    // }

    RtmpMessage rtmp_msg;
    rtmp_msg.type_id = RTMP_NOTIFY;
    rtmp_msg.timestamp = 0;
    rtmp_msg.stream_id = _streamId;
    rtmp_msg.payload = payload;
    rtmp_msg.length = payload_size; 
    sendRtmpChunks(csid, rtmp_msg);  
    return true;
}

bool RtmpConnection::isKeyFrame(std::shared_ptr<char> payload, uint32_t payload_size)
{
	uint8_t frame_type = (payload.get()[0] >> 4) & 0x0f;
	uint8_t codec_id = payload.get()[0] & 0x0f;
	return (frame_type == 1 && (codec_id == RTMP_CODEC_ID_H264 || codec_id == RTMP_CODEC_ID_H265));
}

void RtmpConnection::sendRtmpChunks(uint32_t csid, RtmpMessage& rtmp_msg)
{    
    uint32_t capacity = rtmp_msg.length + rtmp_msg.length/ 5000000 *5 + 1024;
    std::shared_ptr<char> buffer(new char[capacity], std::default_delete<char[]>());

    auto msg = rtmp_msg;
	int size = _chunk.createChunk(csid, msg, buffer.get(), capacity);
	if (size > 0) {
        auto bufferRaw = StreamBuffer::create();
        bufferRaw->assign(buffer.get(), size);
		this->send(bufferRaw);
	}
}


